/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.examples.kotlin.cookbook

import com.google.api.services.bigquery.model.TableRow
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.TextIO
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
import org.apache.beam.sdk.options.Description
import org.apache.beam.sdk.options.PipelineOptions
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.Validation
import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.transforms.ParDo
import org.apache.beam.sdk.transforms.join.CoGbkResult
import org.apache.beam.sdk.transforms.join.CoGroupByKey
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple
import org.apache.beam.sdk.values.KV
import org.apache.beam.sdk.values.PCollection
import org.apache.beam.sdk.values.PDone
import org.apache.beam.sdk.values.TupleTag

/**
 * This example shows how to do a join on two collections. It uses a sample of the GDELT 'world
 * event' data (http://goo.gl/OB6oin), joining the event 'action' country code against a table that
 * maps country codes to country names.
 *
 *
 * Concepts: Join operation; multiple input sources.
 *
 *
 * To execute this pipeline locally, specify a local output file or output prefix on GCS:
 *
 * <pre>`--output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
`</pre> *
 *
 *
 * To change the runner, specify:
 *
 * <pre>`--runner=YOUR_SELECTED_RUNNER
`</pre> *
 *
 * See examples/kotlin/README.md for instructions about how to configure different runners.
 */
object JoinExamples {

    // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
    private const val GDELT_EVENTS_TABLE = "clouddataflow-readonly:samples.gdelt_sample"
    // A table that maps country codes to country names.
    private const val COUNTRY_CODES = "gdelt-bq:full.crosswalk_geocountrycodetohuman"

    /** Join two collections, using country code as the key.  */
    @Throws(Exception::class)
    internal fun joinEvents(
            eventsTable: PCollection<TableRow>, countryCodes: PCollection<TableRow>): PCollection<String> {

        val eventInfoTag = TupleTag<String>()
        val countryInfoTag = TupleTag<String>()

        // transform both input collections to tuple collections, where the keys are country
        // codes in both cases.
        val eventInfo = eventsTable.apply(ParDo.of(ExtractEventDataFn()))
        val countryInfo = countryCodes.apply(ParDo.of(ExtractCountryInfoFn()))

        // country code 'key' -> CGBKR (<event info>, <country name>)
        val kvpCollection = KeyedPCollectionTuple.of(eventInfoTag, eventInfo)
                .and(countryInfoTag, countryInfo)
                .apply(CoGroupByKey.create())

        // Process the CoGbkResult elements generated by the CoGroupByKey transform.
        // country code 'key' -> string of <event info>, <country name>
        val finalResultCollection = kvpCollection.apply(
                "Process",
                ParDo.of(
                        object : DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
                            @ProcessElement
                            fun processElement(c: ProcessContext) {
                                val e = c.element()
                                val countryCode = e.key
                                val countryName = e.value.getOnly(countryInfoTag)
                                for (ei in c.element().value.getAll(eventInfoTag)) {
                                    // Generate a string that combines information from both collection values
                                    c.output(
                                            KV.of<String, String>(
                                                    countryCode,
                                                    "Country name: $countryName, Event info: $ei"))
                                }
                            }
                        }))

        // write to GCS
        return finalResultCollection.apply(
                "Format",
                ParDo.of(
                        object : DoFn<KV<String, String>, String>() {
                            @ProcessElement
                            fun processElement(c: ProcessContext) {
                                val outputString = "Country code: ${c.element().key}, ${c.element().value}"
                                c.output(outputString)
                            }
                        }))
    }

    /**
     * Examines each row (event) in the input table. Output a KV with the key the country code of the
     * event, and the value a string encoding event information.
     */
    internal class ExtractEventDataFn : DoFn<TableRow, KV<String, String>>() {
        @ProcessElement
        fun processElement(c: ProcessContext) {
            val row = c.element()
            val countryCode = row["ActionGeo_CountryCode"] as String
            val sqlDate = row["SQLDATE"] as String
            val actor1Name = row["Actor1Name"] as String
            val sourceUrl = row["SOURCEURL"] as String
            val eventInfo = "Date: $sqlDate, Actor1: $actor1Name, url: $sourceUrl"
            c.output(KV.of(countryCode, eventInfo))
        }
    }

    /**
     * Examines each row (country info) in the input table. Output a KV with the key the country code,
     * and the value the country name.
     */
    internal class ExtractCountryInfoFn : DoFn<TableRow, KV<String, String>>() {
        @ProcessElement
        fun processElement(c: ProcessContext) {
            val row = c.element()
            val countryCode = row["FIPSCC"] as String
            val countryName = row["HumanName"] as String
            c.output(KV.of(countryCode, countryName))
        }
    }

    /**
     * Options supported by [JoinExamples].
     *
     * Inherits standard configuration options.
     */
    interface Options : PipelineOptions {
        @get:Description("Path of the file to write to")
        @get:Validation.Required
        var output: String
    }

    @Throws(Exception::class)
    @JvmStatic
    fun main(args: Array<String>) {
        val options = PipelineOptionsFactory.fromArgs(*args).withValidation() as Options
        val p = Pipeline.create(options)
        // the following two 'apply' create multiple inputs to our pipeline, one for each
        // of our two input sources.
        val eventsTable = p.apply(BigQueryIO.readTableRows().from(GDELT_EVENTS_TABLE))
        val countryCodes = p.apply(BigQueryIO.readTableRows().from(COUNTRY_CODES))
        val formattedResults = joinEvents(eventsTable, countryCodes)
        formattedResults.apply<PDone>(TextIO.write().to(options.output))
        p.run().waitUntilFinish()
    }
}
